package threadPool.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import threadPool.task.*;

import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于disruptor的线程池
 * @author jinmiao
 * 2014-9-12 上午9:51:09
 */
public class BaseDisruptorExecutorPool 
{
	private static final Logger log = LoggerFactory.getLogger(BaseDisruptorExecutorPool.class); 
	protected List<IMessageExecutor> executor = new Vector<IMessageExecutor>();
	
	protected Map<String, IMessageExecutor> executorMap = new ConcurrentHashMap<String, IMessageExecutor>();
	
	protected AtomicInteger index = new AtomicInteger();

	/**定时线程池**/
	private static final ScheduledThreadPoolExecutor scheduled  = new ScheduledThreadPoolExecutor(3,new TimerThreadFacotry());


	public static ScheduledThreadPoolExecutor getScheduled() {
		return scheduled;
	}


	/**定时器线程工厂**/
	private static class TimerThreadFacotry implements ThreadFactory
	{
		private AtomicInteger timeThreadName=new AtomicInteger(0);

		public Thread newThread(Runnable r) {
			Thread thread = new Thread(r,"TimerThread "+timeThreadName.addAndGet(1));
			return thread;
		}
	}

	/**
	 * 创造一个线程对象
	 * @param threadName
	 * @return
	 */
	protected IMessageExecutor createDisruptorProcessor(String threadName,int threadNum)
	{
		if(executorMap.containsKey(threadName))
			throw new RuntimeException("有相同名称的线程池");
		IMessageExecutor process;
		if(threadNum<2)
		{
			DisruptorSingleExecutor singleProcess = new DisruptorSingleExecutor(threadName);
			process = singleProcess;
		}
		else
		{
			DisruptorMultiExecutor multiProcess = new DisruptorMultiExecutor(threadName,threadNum);
			process = multiProcess;
		}
		executorMap.put(threadName, process);
		executor.add(process);
		process.start();
		return process;
	}


    public <T> void call(String executeName,IAsyncTask<T> async , ICallBackTask<T> back,ICallBackException callBackException)
    {
        IMessageExecutor process = executorMap.get(executeName);
        if(process==null)
        {
            log.error("threadService is not exist,threadServiceName = "+executeName);
            return;
        }
        Thread currentThread = Thread.currentThread();
        if(!(currentThread instanceof DisruptorThread)){
            log.error("current is not DisruptorThread");
            return;
        }
        process.execute(() ->{
                try {
                    T t   = async.async();
                    ((DisruptorThread) currentThread).getMessageExecutor().execute(() ->
                            back.callBack(t)
                    );
                }catch (Throwable t){
                    log.error("async error",t);
                    ((DisruptorThread) currentThread).getMessageExecutor().execute(() ->
                            callBackException.handException(t)
                    );
                }
            }
        );
    }


	/**
	 * 发布一个线程任务
	 * @param task  可以是回调任务 {@link DisruptorAsyncBackTask}  或者  {@link DisruptorAsyncTask}
	 * @throws Throwable 
	 */
	public void publishTask(OutTask task) throws Throwable
	{

		if(task.getExecuteName()==null)
		{
			task.execute();
			return;
		}
		IMessageExecutor process = executorMap.get(task.getExecuteName());
		if(process==null)
		{
			log.error("threadService is not exist,threadServiceName = "+task.getExecuteName());
			return;
		}
		process.execute(task);
	}


    public void call(String threadName,ITask task) throws Throwable
    {

        if(threadName==null)
        {
            task.execute();
            return;
        }
        IMessageExecutor process = executorMap.get(threadName);
        if(process==null)
        {
            log.error("threadService is not exist,threadServiceName = "+threadName);
            return;
        }
        process.execute(task);
    }




    /**
	 * 放入一个定时任务
	 * @param task
	 */
	public void pushTimerTask(DistriptorTimerTask task)
	{
		ScheduledFuture<?> future = scheduled.scheduleAtFixedRate(task,
				task.getIntervalMill(), task.getIntervalMill(),
				TimeUnit.MILLISECONDS);
		task.setFuture(future);
	}
	
	
	
	
	public void stop()
	{
		for(IMessageExecutor process:executor)
		{
			process.stop();
		}

		if(!scheduled.isShutdown())
			scheduled.shutdown();
	}
	
	
	
	/**
	 * 从线程池中按算法获得一个线程对象
	 * @return
	 */
	public IMessageExecutor getAutoDisruptorProcessor()
	{
		int index = this.index.incrementAndGet();
		return executor.get(index%executor.size());
	}
	
//	public IMessageProcessor getFirstDisruptorProcessor()
//	{
//		if(executor.isEmpty())
//			return null;
//		return executor.get(0);
//	}
	
	
	
	/**
	 * 根据名称获得线程
	 * @param threadName
	 * @return
	 */
	public IMessageExecutor getProcessByName(String threadName)
	{
		return executorMap.get(threadName);
	}
	
	
//	public static void main(String[] args) {
//		BaseDisruptorExecutor executor = new BaseDisruptorExecutor();
//		IMessageProcessor process = executor.createDisruptorProcessor("main",1);
//		executor.createDisruptorProcessor("async",2);
//		TestAsyncTask task = new TestAsyncTask();
//		task.executor = executor;
//		process.put(task);
//	}

}
